package com.jacarrichan.demo.springcloudstream.web.controller;

import com.jacarrichan.demo.springcloudstream.model.MessageVO;
import com.jacarrichan.demo.springcloudstream.stream.producer.MessageProducerBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

/**
 * KAFKA 生产者 controller（用于spring kafka、spring cloud stream kafka测试）
 *
 * @author JacarriChan
 * @date 2019/7/27
 */
@RestController
public class KafkaProducerController {

    private final KafkaTemplate<String, MessageVO> kafkaTemplate;

    private final MessageProducerBean messageProducerBean;

    private final String topic;

    @Autowired
    public KafkaProducerController(KafkaTemplate<String, MessageVO> kafkaTemplate,
                                   MessageProducerBean messageProducerBean, @Value("${kafka.topic}") String topic) {
        this.kafkaTemplate = kafkaTemplate;
        this.messageProducerBean = messageProducerBean;
        this.topic = topic;
    }

    @PostMapping(value = "/message/send")
    public Boolean sendMessage(@RequestParam String message) {
        MessageVO msg = new MessageVO(message, new Date());
        kafkaTemplate.send(topic, msg);

        return true;
    }

    /**
     * 通过spring cloud stream kafka
     *
     * @param message 消息
     * @return
     */
    @GetMapping(value = "/message/send")
    public Boolean send(@RequestParam String message) {
        MessageVO msg = new MessageVO(message, new Date());
        messageProducerBean.send(msg);
        return false;
    }

    /**
     * 使用自定义的消息source测试
     * 通过spring cloud stream kafka
     *
     * @param message 消息
     * @return
     */
    @GetMapping(value = "/message/mySend")
    public Boolean mySend(@RequestParam String message) {
        MessageVO msg = new MessageVO(message, new Date());
        messageProducerBean.mySend(msg);
        return false;
    }
}